package sync

import (
	"context"
	"runtime"
	"sync"
	"sync/atomic"
	"time"

	es "gitee.com/yrwy/msgo/pkg/errors"
)

type Task func(context.Context)

type GoPoolStatus = int32

type GoPool struct {
	ctx      context.Context
	cancel   context.CancelFunc
	m        sync.Mutex
	wg       sync.WaitGroup
	once     sync.Once
	tasks    chan Task     //任务缓存
	cap      int           //最大允许goroutine
	ttl      time.Duration //goroutine允许空闲时长
	workers  []chan Task   //空闲goroutine栈
	wc       int           //工作中的goroutine数
	idle     int           //最少保留的空闲goroutine数
	lastTime time.Time     //最近的工作时间
	status   int32         //主goroutine状态，0-空闲，1-工作
}

type GoPoolOption func(*GoPool)

//保留最少goroutine个数
func MinGoroutine(n int) GoPoolOption {
	return func(p *GoPool) {
		if n > 0 {
			p.idle = n
		}
	}
}

//最多开启goroutine个数
func MaxGoroutine(n int) GoPoolOption {
	return func(p *GoPool) {
		p.cap = n
	}
}

//goroutine最低保留时间
func GoLifetime(n time.Duration) GoPoolOption {
	return func(p *GoPool) {
		p.ttl = n
	}
}

func NewGoPool(opts ...GoPoolOption) *GoPool {
	ctx, cancel := context.WithCancel(context.TODO())
	p := &GoPool{
		ctx:    ctx,
		cancel: cancel,
		cap:    runtime.NumCPU() * 50,
		ttl:    time.Second * 10,
		idle:   runtime.NumCPU(),
	}
	for _, opt := range opts {
		opt(p)
	}
	if p.cap <= 0 || p.cap >= runtime.NumCPU()*100 {
		p.cap = runtime.NumCPU() * 100
	}
	p.tasks = make(chan Task, p.cap) //最多缓存cap个任务
	p.workers = make([]chan Task, 0, p.cap)
	go p.run() //启动主goroutine
	return p
}

//判断是否存在任务正在执行
func (p *GoPool) IsWorking() bool {
	return p.wc > 0 || atomic.LoadInt32(&p.status) == 1
}

//执行一个任务，如果没有空闲goroutine则等待
func (p *GoPool) Go(f Task) (e error) {
	defer func() {
		if p := recover(); p != nil {
			e = es.ErrRecoverPanic
		}
	}()
	select {
	case <-p.ctx.Done():
		return es.ErrClosed
	default:
		if f != nil {
			wk := p.getWorker()
			if wk != nil {
				wk <- f
			} else {
				p.tasks <- f //放到任务缓存
			}
		}
	}
	return
}

//执行一个任务，如果没有空闲goroutine则直接执行
func (p *GoPool) MustGo(f Task) (e error) {
	defer func() {
		if p := recover(); p != nil {
			e = es.ErrRecoverPanic
		}
	}()
	select {
	case <-p.ctx.Done():
		return es.ErrClosed
	default:
		if f != nil {
			wk := p.getWorker()
			if wk != nil {
				wk <- f
			} else {
				f(p.ctx)
			}
		}
	}
	return
}

//尝试执行一个任务，如果没有空闲goroutine则返回false
func (p *GoPool) TryGo(f Task) (ok bool) {
	defer func() {
		if p := recover(); p != nil {
			ok = false
		}
	}()
	if f != nil {
		select {
		case <-p.ctx.Done():
			return false
		default:
			wk := p.getWorker()
			if wk != nil {
				wk <- f
				ok = true
			} else {
				return false
			}
		}
	}
	return
}

func (p *GoPool) Close() {
	defer func() {
		recover()
	}()
	p.once.Do(func() {
		p.cancel()
		close(p.tasks)
	})
	p.wg.Wait() //等待所有goroutine退出
}

func (p *GoPool) getWorker() chan Task {
	if len(p.tasks) == 0 && atomic.LoadInt32(&p.status) == 0 {
		return p.tasks //给主goroutine分配任务
	} else {
		p.m.Lock()
		defer p.m.Unlock()
		if len(p.workers) > 0 {
			wk := p.workers[len(p.workers)-1]
			p.workers = p.workers[:len(p.workers)-1]
			p.wc++
			return wk
		}
		if p.wc < p.cap {
			//新建一个goroutine
			wk := make(chan Task)
			go p.worker(wk)
			p.wc++
			return wk
		}
	}
	return nil //已经达到最大goroutine
}

func (p *GoPool) run() {
	p.wg.Add(1)
	defer p.wg.Done()
	timer := time.NewTimer(p.ttl)
	defer timer.Stop()
	for {
		select {
		case task, ok := <-p.tasks:
			//work
			if ok {
				if p.ctx.Err() == nil {
					func() {
						atomic.StoreInt32(&p.status, 1)
						defer atomic.StoreInt32(&p.status, 0)
						timer.Reset(p.ttl) //重置检测时间
						//判断是否清理空闲goroutine
						if time.Now().Sub(p.lastTime) > p.ttl && len(p.workers) > p.idle {
							//超过最大空闲数则每次删除一个
							LockGuard(&p.m, func() {
								if len(p.workers) > p.idle {
									close(p.workers[len(p.workers)-1]) //退出
									p.workers = p.workers[:len(p.workers)-1]
								}
							})
						}
						task(p.ctx)
					}()
				}
			} else {
				return
			}
		case <-timer.C:
			//减少worker
			if len(p.workers) > p.idle {
				//只保留idle个goroutine
				LockGuard(&p.m, func() {
					if p.ctx.Err() == nil { //没有退出才执行
						size := len(p.workers)
						if size > p.idle {
							for i := p.idle; i < size; i++ {
								close(p.workers[i]) //退出
							}
							p.workers = p.workers[:p.idle]
						}
						p.lastTime = time.Now() //记录最后变更时间
					}
				})
			}
			timer.Reset(p.ttl) //隔一段时间再减少
		}

	}
}

func (p *GoPool) worker(wk chan Task) {
	closechan := true
	p.wg.Add(1)
	defer p.wg.Done()
	defer func() {
		if closechan {
			close(wk)
		}
	}()
	for {
		select {
		case <-p.ctx.Done():
			return
		case task, ok := <-wk:
			if ok {
				task(p.ctx)
				for ok {
					//如果存在缓存任务则取缓存任务执行
					select {
					case task, ok := <-p.tasks:
						if ok {
							task(p.ctx)
						} else {
							return
						}
					default:
						ok = false
					}
				}
				//执行完毕再次放入空闲队列
				p.pushIdle(wk)
			} else {
				//由外部关闭
				closechan = false
				return
			}
		}
	}
}

func (p *GoPool) pushIdle(wk chan Task) {
	p.m.Lock()
	defer p.m.Unlock()
	p.workers = append(p.workers, wk)
	p.lastTime = time.Now() //记录最后变更时间
	p.wc--                  //空闲队列加1
}
